/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.testutils;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** ZooKeeper test utilities. */
public class ZooKeeperTestUtils {

    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperTestUtils.class);

    /**
     * Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
     *
     * @param zooKeeperQuorum ZooKeeper quorum to connect to
     * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
     *     recovery)
     * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
     */
    public static Configuration createZooKeeperHAConfig(
            String zooKeeperQuorum, String fsStateHandlePath) {

        return configureZooKeeperHA(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
    }

    /**
     * Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
     *
     * @param config Configuration to use
     * @param zooKeeperQuorum ZooKeeper quorum to connect to
     * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
     *     recovery)
     * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
     */
    public static Configuration configureZooKeeperHA(
            Configuration config, String zooKeeperQuorum, String fsStateHandlePath) {

        checkNotNull(config, "Configuration");
        checkNotNull(zooKeeperQuorum, "ZooKeeper quorum");
        checkNotNull(fsStateHandlePath, "File state handle backend path");

        // ZooKeeper recovery mode
        config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);

        int connTimeout = 5000;
        if (runsOnCIInfrastructure()) {
            // The regular timeout is to aggressive for Travis and connections are often lost.
            LOG.info(
                    "Detected CI environment: Configuring connection and session timeout of 30 seconds");
            connTimeout = 30000;
        }

        config.setInteger(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
        config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);

        // File system state backend
        config.setString(StateBackendOptions.STATE_BACKEND, "FILESYSTEM");
        config.setString(
                CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");

        config.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100));

        return config;
    }

    /** @return true, if a CI environment is detected. */
    public static boolean runsOnCIInfrastructure() {
        return System.getenv().containsKey("CI") || System.getenv().containsKey("TF_BUILD");
    }
}
